package JavaSpark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * Action操作
 */
@SuppressWarnings(value = {"unused", "unchecked"})
public class JavaActionOperation {

    public static void main(String[] args) {
        //reduce();
        //collect();
        //count();
        //take();
        //saveAsTextFile();
        countByKey();
    }

    private static void reduce() {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("reduce")
                .setMaster("local");
        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        //构造集合
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        //并行化集合，创建初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
        //使用reduce操作对集合中的数字进行累加
        //reduce操作的原理：
        //将第一个和第二个元素，传入call（）方法，进行计算，会获取一个结果
        //接着将该结果与下一个元素传入call()方法，进行计算
        //以此类推
        //reduce操作的本质：就是聚合，将多个元素聚合成一个元素
        int sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        System.out.println(sum);
        //关闭JavaSparkContext
        sc.close();
    }

    private static void collect() {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("collect")
                .setMaster("local");
        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        //构造集合
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
        //并行化集合，创建初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
        //使用map操作将集合中所有数字乘以2
        JavaRDD<Integer> doubleNumbers = numberRDD.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer v1) throws Exception {
                return v1 * 2;
            }
        });
        //不用foreach action操作，在远程集群上遍历RDD中的元素
        //使用collect操作，将分布在远程集群上的doubleNumber RDD的数据拉取到本地
        //这种方式，一般不建议使用，因为如果RDD中的数据量笔记大，比如过万条
        //性能会比较差，因为要从远程走大量的网络传输，将数据获取到本地
        //此外，还可能在RDD中数据量特别大的情况下，发生oom异常，内存溢出
        //因此，通常还是使用foreach action操作，来对最终的元素进行处理
        List<Integer> doubleNumberList = doubleNumbers.collect();
        for (Integer num : doubleNumberList) {
            System.out.println(num);
        }
        //关闭JavaSparkContext
        sc.close();
    }

    private static void count() {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("count")
                .setMaster("local");
        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        //构造集合
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
        //并行化集合，创建初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
        //对RDD使用count操作，统计它有多少个元素
        long count = numberRDD.count();
        System.out.println(count);
        //关闭JavaSparkContext
        sc.close();
    }

    private static void take() {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("take")
                .setMaster("local");
        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        //构造集合
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
        //并行化集合，创建初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
        //对RDD使用take操作
        //take与collect类似，从远程集群上，获取RDD数据
        //collect是获取RDD的所有数据，take知识获取前n个数据
        List<Integer> top3Numbers = numberRDD.take(3);
        for (Integer num : top3Numbers) {
            System.out.println(num);
        }
        //关闭JavaSparkContext
        sc.close();
    }

    private static void saveAsTextFile() {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("saveAsTextFile")
                .setMaster("local");
        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        //构造集合
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
        //并行化集合，创建初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
        //使用map操作将集合中所有数字乘以2
        JavaRDD<Integer> doubleNumbers = numberRDD.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer v1) throws Exception {
                return v1 * 2;
            }
        });
        //直接将RDD中的数据，保存在文件中
        doubleNumbers.saveAsTextFile("");
        //关闭JavaSparkContext
        sc.close();
    }

    private static void countByKey() {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("countByKey")
                .setMaster("local");
        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        //构造集合
        List<Tuple2<String, String>> scoresList = Arrays.asList(
                new Tuple2<>("class1", "tom"),
                new Tuple2<>("class2", "jack"),
                new Tuple2<>("class1", "leo"),
                new Tuple2<>("class2", "marry"));
        //并行化集合，创建JavaPairRDD
        JavaPairRDD<String, String> students = sc.<String, String>parallelizePairs(scoresList);
        //对RDD应用countByKey操作，统计每个班级的学生人数，就是统计每个key对应的元素个数
        //countByKey返回的类型，直接就是Map<String,Object>
        Map<String, Long> studentCounts = students.countByKey();
        for (Map.Entry<String, Long> studentCount : studentCounts.entrySet()) {
            System.out.println(studentCount.getKey() + "：" + studentCount.getValue());
        }
        //关闭JavaSparkContext
        sc.close();
    }
}
